/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.streampark.console.core.entity;

import com.baomidou.mybatisplus.annotation.*;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.streampark.common.Constant;
import org.apache.streampark.common.enums.*;
import org.apache.streampark.console.core.enums.FlinkAppStateEnum;
import org.apache.streampark.console.core.enums.ReleaseStateEnum;
import org.apache.streampark.console.core.enums.ResourceFromEnum;

import java.io.Serializable;
import java.util.Date;
import java.util.Objects;

@Data
@TableName("t_flink_app")
@Slf4j
public class Application implements Serializable {

    @TableId(type = IdType.AUTO)
    private Long id;

    private Long teamId;

    /**
     * 1) custom code 2) flink SQL
     */
    private Integer jobType;

    private Long projectId;
    /**
     * creator
     */
    private Long userId;

    /**
     * The name of the frontend and program displayed in yarn
     */
    private String jobName;

    @TableField(updateStrategy = FieldStrategy.IGNORED)
    private String appId;

    @TableField(updateStrategy = FieldStrategy.IGNORED)
    private String jobId;

    /**
     * The address of the jobmanager, that is, the direct access address of the Flink web UI
     */
    @TableField(updateStrategy = FieldStrategy.IGNORED)
    private String jobManagerUrl;

    /**
     * flink version
     */
    private Long versionId;

    /**
     * k8s cluster id
     */
    private String clusterId;

    /**
     * flink docker base image
     */
    private String flinkImage;

    /**
     * The resource name of the flink job on k8s, equivalent to clusterId in application mode.
     */
    private String k8sName;

    /**
     * k8s namespace
     */
    private String k8sNamespace = Constant.DEFAULT;

    /**
     * The exposed type of the rest service of K8s(kubernetes.rest-service.exposed.type)
     */
    private Integer k8sRestExposedType;
    /**
     * flink kubernetes pod template
     */
    private String k8sPodTemplate;

    private String k8sJmPodTemplate;
    private String k8sTmPodTemplate;

    @Getter
    private String ingressTemplate;
    private String defaultModeIngress;

    /**
     * flink-hadoop integration on flink-k8s mode
     */
    private Boolean k8sHadoopIntegration;

    private Integer state;
    /**
     * task release status
     */
    @TableField("`release`")
    private Integer release;

    /**
     * determine if a task needs to be built
     */
    private Boolean build;

    /**
     * max restart retries after job failed
     */
    @TableField(updateStrategy = FieldStrategy.IGNORED)
    private Integer restartSize;

    /**
     * has restart count
     */
    private Integer restartCount;

    private Integer optionState;

    /**
     * alert id
     */
    @TableField(updateStrategy = FieldStrategy.IGNORED)
    private Long alertId;

    private String args;
    /**
     * application module
     */
    private String module;

    private String options;

    @TableField(updateStrategy = FieldStrategy.IGNORED)
    private String hotParams;

    private Integer resolveOrder;
    private Integer executionMode;
    private String dynamicProperties;
    private Integer appType;

    /**
     * determine if tracking status
     */
    private Integer tracking;

    private String jar;

    /**
     * for upload type tasks, checkSum needs to be recorded whether it needs to be republished after
     * the update and modify.
     */
    private Long jarCheckSum;

    private String mainClass;

    private Date startTime;

    @TableField(updateStrategy = FieldStrategy.IGNORED)
    private Date endTime;

    private Long duration;

    /**
     * checkpoint max failure interval
     */
    private Integer cpMaxFailureInterval;

    /**
     * checkpoint failure rate interval
     */
    private Integer cpFailureRateInterval;

    /**
     * Actions triggered after X minutes failed Y times: 1: send alert 2: restart
     */
    private Integer cpFailureAction;

    /**
     * overview
     */
    @TableField("TOTAL_TM")
    private Integer totalTM;

    @TableField("HADOOP_USER")
    private String hadoopUser;

    private Integer totalSlot;
    private Integer availableSlot;
    private Integer jmMemory;
    private Integer tmMemory;
    private Integer totalTask;

    /**
     * the cluster id bound to the task in remote mode
     */
    @TableField(updateStrategy = FieldStrategy.IGNORED)
    private Long flinkClusterId;

    private String description;

    private Date createTime;

    private Date optionTime;

    private Date modifyTime;

    /**
     * 1: cicd (build from csv) 2: upload (upload local jar job)
     */
    private Integer resourceFrom;

    private String tags;

    private Boolean probing = false;

    /**
     * running job
     */
//    private transient JobsOverview.Task overview;

    private transient String teamResource;
    private transient String dependency;
    private transient Long sqlId;
    private transient String flinkSql;

    private transient Integer[] stateArray;
    private transient Integer[] jobTypeArray;
    private transient Boolean backUp = false;
    private transient Boolean restart = false;
    private transient String userName;
    private transient String nickName;
    private transient String config;
    private transient Long configId;
    private transient String flinkVersion;
    private transient String confPath;
    private transient Integer format;
    private transient String savePoint;
    private transient Boolean savePointed = false;
    private transient Boolean drain = false;
    private transient Boolean nativeFormat = false;
    private transient Long savePointTimeout = 60L;
    private transient Boolean allowNonRestored = false;
    private transient Integer restoreMode;
    private transient String socketId;
    private transient String projectName;
    private transient String createTimeFrom;
    private transient String createTimeTo;
    private transient String backUpDescription;
    private transient String yarnQueue;

    /**
     * Flink Web UI Url
     */
    private transient String flinkRestUrl;

    /**
     *
     */
    private transient Integer buildStatus;


    public void setDefaultModeIngress(String defaultModeIngress) {
        this.defaultModeIngress = defaultModeIngress;
    }


    public void setState(Integer state) {
        this.state = state;
        this.tracking = shouldTracking() ? 1 : 0;
    }


    /**
     * Determine if a FlinkAppState requires tracking.
     *
     * @return 1: need to be tracked | 0: no need to be tracked.
     */
    public Boolean shouldTracking() {
        switch (getStateEnum()) {
            case ADDED:
            case CREATED:
            case FINISHED:
            case FAILED:
            case CANCELED:
            case TERMINATED:
            case POS_TERMINATED:
                return false;
            default:
                return true;
        }
    }

    /**
     * Determine whether the application can be started to prevent repeated starts.
     *
     * @return true: can start | false: can not start.
     */
    public boolean isCanBeStart() {
        switch (getStateEnum()) {
            case ADDED:
            case CREATED:
            case FAILED:
            case CANCELED:
            case FINISHED:
            case LOST:
            case TERMINATED:
            case SUCCEEDED:
            case KILLED:
            case POS_TERMINATED:
                return true;
            default:
                return false;
        }
    }

    @JsonIgnore
    public ReleaseStateEnum getReleaseState() {
        return ReleaseStateEnum.of(release);
    }

    @JsonIgnore
    public FlinkDevelopmentMode getDevelopmentMode() {
        return FlinkDevelopmentMode.of(jobType);
    }

    @JsonIgnore
    public FlinkAppStateEnum getStateEnum() {
        return FlinkAppStateEnum.of(state);
    }

    @JsonIgnore
    public FlinkK8sRestExposedType getK8sRestExposedTypeEnum() {
        return FlinkK8sRestExposedType.of(this.k8sRestExposedType);
    }

    @JsonIgnore
    public FlinkExecutionMode getFlinkExecutionMode() {
        return FlinkExecutionMode.of(executionMode);
    }

    public boolean cpFailedTrigger() {
        return this.cpMaxFailureInterval != null
                && this.cpFailureRateInterval != null
                && this.cpFailureAction != null;
    }


    /**
     * Local compilation and packaging working directory
     */

    /**
     * Automatically identify remoteAppHome or localAppHome based on app FlinkExecutionMode
     */
    @JsonIgnore
    public String getAppHome() {
        switch (this.getFlinkExecutionMode()) {
            case KUBERNETES_NATIVE_APPLICATION:
            case KUBERNETES_NATIVE_SESSION:
            case YARN_PER_JOB:
            case YARN_SESSION:
            case REMOTE:
            case LOCAL:
            case YARN_APPLICATION:
            default:
                throw new UnsupportedOperationException(
                        "unsupported executionMode ".concat(getFlinkExecutionMode().getName()));
        }
    }

    @JsonIgnore
    public String getAppLib() {
        return getAppHome().concat("/lib");
    }

    @JsonIgnore
    public ApplicationType getApplicationType() {
        return ApplicationType.of(appType);
    }


    @JsonIgnore
    public boolean isFlinkSqlJob() {
        return FlinkDevelopmentMode.FLINK_SQL.getMode().equals(this.getJobType());
    }

    @JsonIgnore
    public boolean isFlinkSqlJobOrPyFlinkJob() {
        return FlinkDevelopmentMode.FLINK_SQL.getMode().equals(this.getJobType())
                || FlinkDevelopmentMode.PYFLINK.getMode().equals(this.getJobType());
    }

    @JsonIgnore
    public boolean isCustomCodeJob() {
        return FlinkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType());
    }

    @JsonIgnore
    public boolean isCustomCodeOrPyFlinkJob() {
        return FlinkDevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType())
                || FlinkDevelopmentMode.PYFLINK.getMode().equals(this.getJobType());
    }

    @JsonIgnore
    public boolean isUploadJob() {
        return isCustomCodeOrPyFlinkJob()
                && ResourceFromEnum.UPLOAD.getValue().equals(this.getResourceFrom());
    }

    @JsonIgnore
    public boolean isCICDJob() {
        return isCustomCodeOrPyFlinkJob()
                && ResourceFromEnum.CICD.getValue().equals(this.getResourceFrom());
    }

    public boolean isStreamParkJob() {
        return this.getAppType() == ApplicationType.STREAMPARK_FLINK.getType();
    }


    @JsonIgnore
    public boolean isRunning() {
        return FlinkAppStateEnum.RUNNING.getValue() == this.getState();
    }

    @JsonIgnore
    public boolean isNeedRollback() {
        return ReleaseStateEnum.NEED_ROLLBACK.get() == this.getRelease();
    }

    @JsonIgnore
    public boolean isNeedRestartOnFailed() {
        if (this.restartSize != null && this.restartCount != null) {
            return this.restartSize > 0 && this.restartCount <= this.restartSize;
        }
        return false;
    }

    @JsonIgnore
    public StorageType getStorageType() {
        return getStorageType(getExecutionMode());
    }

    public static StorageType getStorageType(Integer execMode) {
        FlinkExecutionMode executionModeEnum = FlinkExecutionMode.of(execMode);
        switch (Objects.requireNonNull(executionModeEnum)) {
            case YARN_APPLICATION:
                return StorageType.HDFS;
            case YARN_PER_JOB:
            case YARN_SESSION:
            case KUBERNETES_NATIVE_SESSION:
            case KUBERNETES_NATIVE_APPLICATION:
            case REMOTE:
                return StorageType.LFS;
            default:
                throw new UnsupportedOperationException("Unsupported ".concat(executionModeEnum.getName()));
        }
    }


    private boolean needFillYarnQueueLabel(FlinkExecutionMode mode) {
        return FlinkExecutionMode.YARN_PER_JOB == mode || FlinkExecutionMode.YARN_APPLICATION == mode;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        return id.equals(((Application) o).id);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }

    public static class SFunc {
        public static final SFunction<Application, Long> ID = Application::getId;
        public static final SFunction<Application, String> JOB_ID = Application::getJobId;
        public static final SFunction<Application, Date> START_TIME = Application::getStartTime;
        public static final SFunction<Application, Date> END_TIME = Application::getEndTime;
        public static final SFunction<Application, Long> DURATION = Application::getDuration;
        public static final SFunction<Application, Integer> TOTAL_TASK = Application::getTotalTask;
        public static final SFunction<Application, Integer> TOTAL_TM = Application::getTotalTM;
        public static final SFunction<Application, Integer> TOTAL_SLOT = Application::getTotalSlot;
        public static final SFunction<Application, Integer> JM_MEMORY = Application::getJmMemory;
        public static final SFunction<Application, Integer> TM_MEMORY = Application::getTmMemory;
        public static final SFunction<Application, Integer> STATE = Application::getState;
        public static final SFunction<Application, String> OPTIONS = Application::getOptions;
        public static final SFunction<Application, Integer> AVAILABLE_SLOT =
                Application::getAvailableSlot;
        public static final SFunction<Application, Integer> EXECUTION_MODE =
                Application::getExecutionMode;
        public static final SFunction<Application, String> JOB_MANAGER_URL =
                Application::getJobManagerUrl;
    }
}
